package com.fourth.app.dim;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fourth.app.func.DimSinkFunction;
import com.fourth.app.func.TableProcessFunction;
import com.fourth.bean.TableProcess;
import com.fourth.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;


import org.apache.flink.api.common.state.MapStateDescriptor;

import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


/**
 * @Author CZQ
 * @Date 2022/8/18 16:07
 * @Version 1.0
 */
public class DimApp {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

/*        //2.状态后端设置
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                10, Time.of(1L, TimeUnit.DAYS), Time.of(3L, TimeUnit.MINUTES)
        ));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop101:8020/edu/ck");
        System.setProperty("HADOOP_USER_NAME", "atguigu");*/

        // 3.读取Kafka Topic_db主题数据创建流
        String topic = "topic_db";
        String groupId = "dim_app";
        DataStreamSource<String> KafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //4.将数据转化为JSON对象、过滤并把脏数据测数据流输出

        OutputTag<String> dirty = new OutputTag<String>("Dirty"){};

        SingleOutputStreamOperator<JSONObject> jsonObjDS = KafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception ignored) {

                }
            }
        });
        jsonObjDS.getSideOutput(dirty).print("脏数据>>>>>>>>>>");

        //5.使用FlinkCDC读取配置信息表创建配置流
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hadoop105")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("edu_config")
                .tableList("edu_config.table_process")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        DataStreamSource<String> mySqlSourceDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),
                "mySqlSource");

        //6.将配置信息流处理成广播流

        MapStateDescriptor<String, TableProcess> stateDescriptor = new MapStateDescriptor<>("map" +
                "-state", String.class, TableProcess.class);
        BroadcastStream<String> broadcastStream = mySqlSourceDS.broadcast(stateDescriptor);

        jsonObjDS.print("kafka>>>>>>>>>>>>");

        //7.连接主流与广播流
        BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);

        //8.根据广播流数据处理主流数据
        SingleOutputStreamOperator<JSONObject> hbaseDS = connectedStream.process(new TableProcessFunction(stateDescriptor));
        hbaseDS.print(">>>>>>>>>>>>");

        //9.将数据写入到PHoenix
        //TODO 8.将数据写出到Phoenix中
        //JdbcSink.sink()只能往单表中写数据，所以只能自定义
        hbaseDS.addSink(new DimSinkFunction());

        //TODO 9.启动任务
        env.execute("DimApp");

        //10.启动任务

    }
}
